S3からBigQueryへ連携する方法いろいろ(Omni/Transfer Service)
データアナリティクス事業本部、池田です。花粉症です。
Google CloudのBigQuery Omniを使ってみたかったので、他の連携方法と比較してみました。
切りが無さそうだったので、ソースはS3に限定し、
Google Cloudのサービスや機能でできる(作り込みが要らない)方法として3つを試しました。
また、AWS側は再利用できるように CloudFormation
でテンプレートにしました。
先にまとめ
各ガイドや触ってみた結果を基に、
個人的に選定時のポイントになりそうと思ったところをまとめてみました。
※制約や挙動などは執筆時点(2022/04/14)のものです。
BigQuery Omni |
|
---|---|
BigQuery Data Transfer Service |
|
Cloud Storage Transfer Service
+ BigQuery 外部テーブル |
|
(04/19追記) Cross-cloud transfer (「LOAD DATA INTO」文)についてはGAされたら追記します。
(11/10追記) LOAD DATA文 がGAされました。
実際の開発では目的とこの辺の条件に、データの性質(upsert要るかなど)やらネットワーク構成やら料金やらを交えて検討する感じでしょーか…
BigQuery Omni
初めは BigQuery Omni
を使ってBigQueryからS3に対してクエリを発行してみます。
Omniは定額料金のみなので、
Flex Slots の100スロットを設定して検証しました。
↓公式ガイドの構築手順。
【 Use BigQuery Omni with AWS 】
※執筆時点(2022/04/14)では日本語版のガイドは古いと思われる記載が見受けられたので英語での参照をお勧めします。
Google Cloud側準備
BigQuery Connection APIを有効化します。
Cloud Shell からbqコマンドで BigQueryの接続を作成します。
app_name=bqomni-s3-access aws_account_id=<AWSのアカウントID> aws_iam_role_id="arn:aws:iam::${aws_account_id}:role/${app_name}-role" bq mk --connection --connection_type='AWS' \ --iam_role_id=$aws_iam_role_id \ --location=aws-us-east-1 \ ${app_name}-conn
執筆時点で選択できるリージョンは aws-us-east-1
だけでした。(というかこういう名前のリージョン(location)が有ることに少し驚きました。)
作成できると以下のような結果が返ってくるので、あとで使用する<Identity>の部分を控えておきます。
Connection <プロジェクト番号>.aws-us-east-1.bqomni-s3-access-conn successfully created Please add the following identity to your AWS IAM Role 'arn:aws:iam::<AWSのアカウントID>:role/bqomni-s3-access-role' Identity: '<Identity>'
BigQueryのコンソールからも作成結果やIdentityを確認できます。
※ガイドの手順だと「AWSのIAMロール仮作成→接続作成→AWSのIAMロールにIdentity設定」のような流れですが、 私はめんどくさいので「IAMロール名は決め打ちで接続作成→AWSのIAMロール作成」で作成しています。
(2022/11/11追記) ストレージへのアクセスなどは BigLake の機能として整備されました。 上画像の接続タイプも「AWS 上の BigLake(BigQuery Omni 経由)」と表示が変わっています。
AWS側準備
Google Cloudに接続を許可するため、IAMのロールとポリシーを作成します。 フェデレーションで設定できるようです。
↓CloudFormationで。前節のIdentityをパラメータにしてやります。
AWSTemplateFormatVersion: "2010-09-09" Description: "IAM Role for BigQuery Omni" Parameters: AppName: Type: String Default: "bqomni-s3-access" BucketName: Type: String BucketPrefix: Type: String BqIdentity: Type: String Description: "the result of 'bq mk --connection ...'" Resources: BqS3AccessRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${AppName}-role" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: sts:AssumeRoleWithWebIdentity Principal: Federated: "accounts.google.com" Condition: StringEquals: accounts.google.com:sub: !Ref BqIdentity MaxSessionDuration: 43200 ManagedPolicyArns: - !Ref BqS3AccessPolicy BqS3AccessPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub "${AppName}-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:ListBucket Resource: - !Sub "arn:aws:s3:::${BucketName}" Condition: StringLike: s3:prefix: !Sub "${BucketPrefix}/*" - Effect: Allow Action: - s3:GetObject - s3:PutObject # If you need to write to S3. Resource: - !Sub "arn:aws:s3:::${BucketName}/${BucketPrefix}/*"
S3は作成済みのものを使用しました。
( us-east-1
リージョンでないバケットでは、あとのクエリ実行でエラーになりました。)
BigQueryテーブル作成
BigQueryのコンソールからSQLを発行してデータセットと外部テーブルを作成します。
↓データセットを aws-us-east-1
で作成。
CREATE SCHEMA bqomni_s3_access OPTIONS( location="aws-us-east-1" );
この時、定額料金を割り当てていないとエラーになりました。(逆に言えば、ここまではコミットを準備しなくても構築できそうです。)
BQ Omni Region aws-us-east-1 currently only supports flat rate slots. To continue running a query, please purchase a BQ Omni commitment and create a reservation.
↓作成した接続を使って外部テーブルを作成。
CREATE EXTERNAL TABLE bqomni_s3_access.sample_tbl ( station_number INTEGER, year INTEGER, month INTEGER, day INTEGER, fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN ) WITH CONNECTION `aws-us-east-1.bqomni-s3-access-conn` OPTIONS( format="JSON", compression="GZIP", uris=["s3://<S3バケット名>/bq-s3-access/*"] );
(2~5行目のカラム定義は無くても良いです。自動で検出してくれます。)
使ってみる・感想
↓テーブルを作成すると、BigQueryのコンソールからクエリが発行でき、S3の中身を参照できました。
S3に新たにファイルを追加してみると、その結果はすぐにクエリ結果にも反映されました。
ただし、出力結果が多過ぎると以下のエラーになりました。
Response too large to return. Please consider using EXPORT DATA.
ガイド によると2MBまでの制限があるそうです。
それ以上はS3にエクスポートしろってことですかね。
The maximum result size for interactive queries is 2 MB.
また、作成した外部テーブルをBigQuery内に永続化できないかとCTASやINSERTをしようとしてみましたが、 できなそうな感じでした。 S3からデータを持ち出させないための仕組みなのでしょうか…?
You can't create standard tables in BigQuery Omni. BigQuery Omni only supports external tables.
現時点では制約も多いのですが、S3を直接参照できるのは便利だと思いました。 Flex Slotsの準備も含め、それほど時間をかけずに構築できました。
BigQuery Data Transfer Service
次に BigQuery Data Transfer Service で、S3のデータをBigQueryのテーブルに転送してみます。
↓公式ガイドの設定手順。
【 Overview of Amazon S3 transfers 】
AWS側準備
アクセスキーでの設定方法しか無いようなので、IAMユーザーを作成します。
↓CloudFormation。
AWSTemplateFormatVersion: "2010-09-09" Description: "IAM User for BigQuery Data Transfer Service" Parameters: AppName: Type: String Default: "bqdts-s3-access" Resources: BqS3AccessUser: Type: AWS::IAM::User Properties: UserName: !Sub "${AppName}-user" ManagedPolicyArns: - "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
前章のOmniと同じようなポリシーだとエラーになったので、
ガイド
に従って AmazonS3ReadOnlyAccess
を与えています。
At a minimum, the Amazon S3 source data must have the AWS managed policy AmazonS3ReadOnlyAccess applied to it.
IAMユーザー作成後、コンソールからアクセスキー(アクセスキー ID&シークレットアクセスキー)を発行しておきます。
BigQueryテーブル作成
BigQueryのコンソールからSQLを発行して、データセットと宛先になるテーブルを作成しておきます。
CREATE SCHEMA bqdts_s3_access OPTIONS( location="US" ); CREATE TABLE bqdts_s3_access.sample_tbl ( station_number INTEGER, year INTEGER, month INTEGER, day INTEGER, fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN );
ちなみに、S3のバケットが ap-northeast-1
、データセットが US
という構成でも動作はしました。
Google Cloud側準備
BigQuery Data Transfer APIを有効化します。
Cloud ShellからbqコマンドでBigQueryの転送構成を作成します。 前々節のアクセスキーをここで使います。
app_name=bqdts-s3-access bq mk --transfer_config --data_source=amazon_s3 \ --display_name=${app_name}-dts \ --target_dataset=bqdts_s3_access \ --params='{ "destination_table_name_template":"sample_tbl", "data_path":"s3://<S3バケット名>/bq-s3-access/*", "access_key_id":"<AWSのアクセスキー ID>", "secret_access_key":"<AWSのシークレットアクセスキー>", "file_format":"JSON" }'
↓作成結果。
コマンドで作成するとスケジュールは「24時間ごと」で設定されるようです。
また、 ガイド
によると転送の最小間隔も24時間とのことです。
The minimum interval time between recurring transfers is 24 hours. The default interval for a recurring transfer is 24 hours.
作成が成功すると、すぐに転送が始まりました。
使ってみる・感想
↓転送実行の完了後、BigQueryのコンソールからクエリが発行でき、転送されたデータを参照できました。
S3にファイルを追加して動きを試してみたのですが、
24時間以内でも手動で転送を実行することで、追加分も連携することができました。
ただし、ファイルアップロードした直後は転送対象にならず、少し待つ必要があるようです。
When you transfer data from Amazon S3, it is possible that some of your data will not be transferred to BigQuery, particularly if the files were added to the bucket very recently. It should take approximately 10 minutes for a file to become available to the BigQuery Data Transfer Service after it is added to the bucket.
今回やってみた中では一番設定が楽だった気がします。たぶん。
S3だけでなく Redshiftでも使える
ようなので、便利そうです。
Cloud Storage Transfer Service + BigQuery 外部テーブル
最後は Cloud Storage Transfer Service です。 このサービスでは「S3バケット→Cloud Storageバケット」の転送になるので、 BigQueryの外部テーブル を使って、 Cloud StorageのファイルをBigQueryで参照します。
↓公式ガイドの設定手順はこの辺。
【 Configuring access to data sources and sinks >Amazon S3 >Federated identity】
【 CREATE EXTERNAL TABLE statement 】
Google Cloud側準備1
Storage Transfer APIを有効化します。
S3とはアクセスキーとフェデレーションでの接続方法が選べるようなので、後者で進めます。
googleServiceAccounts.get
のAPIを使って転送時に使用されるサービスアカウントの情報を取得します。
【 Method: googleServiceAccounts.get 】
このサイトからAPIを実行して…
以下のような情報が手に入るので、あとで使用する<subjectId>の値を控えておきます。
{ "accountEmail": "project-<プロジェクト番号>@storage-transfer-service.iam.gserviceaccount.com", "subjectId": "<subjectId>" }
また、前述のサービスアカウントがCloud Storageを操作できるように、
Cloud Shellからgsutilコマンドで権限を付与します。
Storage レガシー バケット書き込み
が必要とのことです。
gsutil iam ch serviceAccount:project-<プロジェクト番号>@storage-transfer-service.iam.gserviceaccount.com:legacyBucketWriter \ gs://<Cloud Storageバケット名>
Cloud Storageは作成済みのものを使用しました。
ちなみに、S3のバケットが ap-northeast-1
、Cloud Storageのバケットが us-east1
という構成でも動作はしました。
AWS側準備
フェデレーション設定のIAMのロールとポリシーを作成します。
↓CloudFormation。前節でAPIで取得したサービスアカウントのsubjectIdを使います。
AWSTemplateFormatVersion: "2010-09-09" Description: "IAM Role for Cloud Storage Transfer Service" Parameters: AppName: Type: String Default: "sts-s3-access" BucketName: Type: String BucketPrefix: Type: String StsSubjectId: Type: String Description: "the result of 'https://cloud.google.com/storage-transfer/docs/reference/rest/v1/googleServiceAccounts/get'" Resources: StsS3AccessRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${AppName}-role" AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: sts:AssumeRoleWithWebIdentity Principal: Federated: "accounts.google.com" Condition: StringEquals: accounts.google.com:sub: !Ref StsSubjectId ManagedPolicyArns: - !Ref StsS3AccessPolicy StsS3AccessPolicy: Type: AWS::IAM::ManagedPolicy Properties: ManagedPolicyName: !Sub "${AppName}-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:ListBucket Resource: - !Sub "arn:aws:s3:::${BucketName}" Condition: StringLike: s3:prefix: !Sub "${BucketPrefix}/*" - Effect: Allow Action: - s3:GetBucketLocation Resource: - !Sub "arn:aws:s3:::${BucketName}" - Effect: Allow Action: - s3:GetObject # - s3:DeleteObject # If you need to delete from S3. Resource: - !Sub "arn:aws:s3:::${BucketName}/${BucketPrefix}/*"
ポリシーの内容は この辺のガイド を参考にしています。
Google Cloud側準備2
transferJobs.create
のAPIを使って転送のジョブを作成します。
(gcloudコマンドでもジョブ作成はできますが、
執筆時点でフェデレーションのロールの指定の方法がよく分からない(未対応?)ので、APIにしました。)
【 Method: transferJobs.create 】
このサイトから,、以下のようなリクエストボディでAPIを実行します。
{ "name": "transferJobs/s3-access", "projectId": "<プロジェクト番号>", "transferSpec": { "awsS3DataSource": { "bucketName": "<S3バケット名>", "roleArn": "arn:aws:iam::<AWSのアカウントID>:role/sts-s3-access-role" }, "gcsDataSink": { "bucketName": "<Cloud Storageバケット名>", "path": "sts/" }, "objectConditions": { "maxTimeElapsedSinceLastModification": "604800s", "includePrefixes": "bq-s3-access/" } }, "schedule": { "repeatInterval": "3600s", "scheduleStartDate": { "day": 1, "month": 4, "year": 2022 } }, "status": "ENABLED" }
↑では7日(604800s)以内に作成/更新されたファイルを1時間(3600s)ごとに転送するような設定にしています。
開始日( scheduleStartDate
)は必須と言われたので、適当な過去日付を入れました。
作成の際に、AWS側の設定などが誤っているとエラーになり、ジョブを作成できませんでした。
また、転送の間隔( repeatInterval
)は1時間以上にする必要があるそうです。
↓作成結果。
「ENABLED」を設定していたので、作成が成功すると、転送が始まりました。
BigQueryテーブル作成
BigQueryのコンソールからSQLを発行してデータセットと外部テーブルを作成します。
CREATE SCHEMA sts_s3_access OPTIONS( location="us-east1" ); CREATE EXTERNAL TABLE sts_s3_access.sample_tbl ( station_number INTEGER, year INTEGER, month INTEGER, day INTEGER, fog BOOLEAN, rain BOOLEAN, snow BOOLEAN, hail BOOLEAN, thunder BOOLEAN, tornado BOOLEAN ) OPTIONS( format="JSON", compression="GZIP", uris=["gs://<Cloud Storageバケット名>/sts/bq-s3-access/*"] );
(7~10行目のカラム定義は無くてもOK。)
接続の記述が無いこと以外は先述のOmniの時のテーブル作成文と同じ構文ですね。
データセットはCloud Storageのバケットと同じリージョンである必要がありました。
使ってみる・感想
↓Cloud Storageにファイルがあれば、BigQueryのコンソールから転送されたデータを参照できました。
こちらも、BigQuery Data Transfer Serviceと同じく、 手動で実行すれば、スケジュール間隔よりも短い間隔で転送が可能なようです。 (BigQuery Data Transfer Serviceと異なり、ファイルアップロード直後でも転送対象になっていました。)
今回試した方法の中では一番手間がかかった気がします。たぶん。 その分、一番自由が利くかなといった感触です。転送間隔も短いし。
おわりに
いろいろ試してみました。 ちょっと検証に時間がかかりましたが、CloudFormationやコマンドにしておいたので、 今後楽に構築できるでしょう。たぶん。(ネットワーク周りは…)
今回はAWS側をS3に限定しましたが、他のサービスであれば他にも方法がありそうです。
【 Amazon AuroraのデータをリアルタイムにGoogle BigQueryに連携してみた / Realtime data linkage from Amazon Aurora to Google BigQuery 】
へー。
逆にBigQueryからAWS環境に連携だと、Glueのカスタムコネクタとか。
【 AWS Glue Connector for Google BigQueryを使ってBigQueryからS3にデータを転送する 】
いろいろですね。
関連情報/参考にさせていただいたページ
- BigQuery Omni
- BigQuery Data Transfer Service
- Cloud Storage Transfer Service
- BigQuery 外部テーブル
- その他